package serveryt

import (
	"fmt"
	"gitee.com/zhancaihua/goyt/core/discover"
	"gitee.com/zhancaihua/goyt/core/logyt"
	"gitee.com/zhancaihua/goyt/core/proc"
	"gitee.com/zhancaihua/goyt/serveryt/middleware"
	"google.golang.org/grpc"
	"google.golang.org/grpc/keepalive"
	"time"
)

const defaultConnectionIdleDuration = time.Minute * 5

type RpcServer struct {
	s         *ServerYT
	name      string
	srvOption rpcServerOptions
}
type rpcServerOptions struct {
	health      bool
	middlewares ServerMiddlewaresConf
	name        string
	addr        string
}
type RpcServerOption func(*rpcServerOptions)
type ServerMiddlewaresConf struct {
	Recover bool `mapstructure:"recover" json:"recover,omitempty"`
}

//srvOption

func WithMiddleware(conf ServerMiddlewaresConf) RpcServerOption {
	return func(options *rpcServerOptions) {
		options.middlewares = conf
	}
}
func WithHealth() RpcServerOption {
	return func(options *rpcServerOptions) {
		options.health = true
	}
}
func WithName(name string) RpcServerOption {
	return func(options *rpcServerOptions) {
		options.name = name
	}
}
func WithAddr(addr string) RpcServerOption {
	return func(options *rpcServerOptions) {
		options.addr = addr
	}
}

//server func

func newRpcServer(options ...RpcServerOption) *RpcServer {

	var op rpcServerOptions
	for _, option := range options {
		option(&op)
	}
	s := NewServerYT()
	s.ServeAt(op.addr)
	// 一些默认配置
	s.WithGrpcOption(grpc.KeepaliveParams(keepalive.ServerParameters{
		MaxConnectionIdle: defaultConnectionIdleDuration,
	}))
	return &RpcServer{
		s:         s,
		srvOption: op,
	}
}
func (s *RpcServer) Start(fn RegisterFn) error {
	// 配置转option
	unaryInterceptorOption := grpc.ChainUnaryInterceptor(buildUnaryInterceptors(s)...)
	streamInterceptorOption := grpc.ChainStreamInterceptor(buildStreamInterceptors(s)...)
	s.s.WithGrpcOption(unaryInterceptorOption, streamInterceptorOption)
	if s.srvOption.health {
		s.s.EnableHealthCheck()
	}
	return s.s.Start(fn)
}

func (s *RpcServer) StartAsync(fn RegisterFn) (<-chan error, error) {
	// 配置转option
	unaryInterceptorOption := grpc.ChainUnaryInterceptor(buildUnaryInterceptors(s)...)
	streamInterceptorOption := grpc.ChainStreamInterceptor(buildStreamInterceptors(s)...)
	s.s.WithGrpcOption(unaryInterceptorOption, streamInterceptorOption)
	if s.srvOption.health {
		s.s.EnableHealthCheck()
	}
	return s.s.StartAsync(fn)
}

func (s *RpcServer) Stop() error {
	return s.s.Stop()
}
func (s *RpcServer) StopNow() error {
	return s.s.StopNow()
}

func buildStreamInterceptors(s *RpcServer) []grpc.StreamServerInterceptor {
	var interceptors []grpc.StreamServerInterceptor
	middle := s.srvOption.middlewares

	if middle.Recover {
		interceptors = append(interceptors, middleware.StreamRecoverInterceptor)
	}
	return interceptors
}

func buildUnaryInterceptors(s *RpcServer) []grpc.UnaryServerInterceptor {
	var interceptors []grpc.UnaryServerInterceptor
	middle := s.srvOption.middlewares

	if middle.Recover {
		interceptors = append(interceptors, middleware.UnaryRecoverInterceptor)
	}
	return interceptors
}

type Visitor interface {
	//Visit never change RpcServer
	Visit(*RpcServer) error
}

func (s *RpcServer) Visit(visitor Visitor) error {
	return visitor.Visit(s)
}

type ServerPubVisitor struct {
	etcd discover.EtcdConf
}

func NewServerPubVisitor(etcd discover.EtcdConf) Visitor {
	return &ServerPubVisitor{etcd: etcd}
}
func (v *ServerPubVisitor) Visit(s *RpcServer) error {
	etcd := v.etcd
	var pubOpts []discover.PubOption
	if etcd.HasAccount() {
		pubOpts = append(pubOpts, discover.WithPubEtcdAccount(etcd.User, etcd.Pass))
	}
	if etcd.HasID() {
		pubOpts = append(pubOpts, discover.WithId(etcd.ID))
	}

	pubClient := discover.NewPublisher(etcd.Hosts, etcd.Key, s.srvOption.addr, pubOpts...)
	return pubClient.KeepAlive()
}

type RpcServerConf struct {
	Name     string            `mapstructure:"name" json:"name,omitempty"`
	ListenOn string            `mapstructure:"listen_on" json:"listen_on,omitempty"`
	Etcd     discover.EtcdConf `mapstructure:"etcd" json:"etcd,omitempty"`
	// grpc health check switch
	Health      bool                  `mapstructure:"health" json:"health,omitempty"`
	Middlewares ServerMiddlewaresConf `mapstructure:"middlewares" json:"middlewares,omitempty"`
}

func (sc RpcServerConf) HasEtcd() bool {
	return len(sc.Etcd.Hosts) > 0 && len(sc.Etcd.Key) > 0
}

// SetUpServer 新建服务实例，并初始化日志，注册服务等项目
func SetUpServer(c RpcServerConf) (*RpcServer, error) {
	var err error

	//初始化服务
	var rpcServerOption []RpcServerOption
	rpcServerOption = append(rpcServerOption, WithName(c.Name), WithAddr(c.ListenOn), WithMiddleware(c.Middlewares))
	if c.Health {
		rpcServerOption = append(rpcServerOption, WithHealth())
	}
	rpcServer := newRpcServer(rpcServerOption...)

	//注册
	if c.HasEtcd() {
		err = rpcServer.Visit(NewServerPubVisitor(c.Etcd))
		if nil != err {
			return nil, fmt.Errorf("error when publishing to etcd %w", err)
		}
	}
	return rpcServer, nil
}

// Start 启动服务,并注册全局回收监听。此方法在服务退出后，会继续等待所有监听器执行完毕
func Start(s *RpcServer, fn RegisterFn) (e error) {
	//回收监听
	waitForCalled := proc.AddWrapUpListener(func() {
		_ = s.Stop()
		logyt.InfoLog("server stopped")
	})
	defer func() {
		if a := recover(); nil != a {
			logyt.ErrorLog("server stop", logyt.LogField{
				Key:   "err",
				Value: a,
			})
		}
		if nil != e {
			logyt.ErrorLog("server stop", logyt.LogField{
				Key:   "err",
				Value: e,
			})
		}
		//等待所有资源回收，继续等待所有监听器执行完毕
		waitForCalled()
	}()

	e = s.Start(fn)
	return
}

// ShutDown 由于服务器目前会被信号主动关闭，所以当前无事可做。你可以用他收集一些回调方法统一调用
func ShutDown(s *RpcServer, fs ...func(*RpcServer)) {
	for _, f := range fs {
		f(s)
	}
}
